/*-------------------------------------------------------------------------
 *
 * hash_xlog.c
 *	  WAL replay logic for hash index.
 *
 *
 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
 * Portions Copyright (c) 1994, Regents of the University of California
 *
 * IDENTIFICATION
 *	  src/backend/access/hash/hash_xlog.c
 *
 *-------------------------------------------------------------------------
 */
#include "postgres.h"

#include "access/bufmask.h"
#include "access/hash.h"
#include "access/hash_xlog.h"
#include "access/transam.h"
#include "access/xlog.h"
#include "access/xlogutils.h"
#include "miscadmin.h"
#include "storage/procarray.h"
#include "storage/buf_internals.h"


/*
 * replay a hash index meta page
 */
static void
hash_xlog_init_meta_page(XLogReaderState *record)
{
	XLogRecPtr	lsn = record->EndRecPtr;
	Page		page;
	Buffer		metabuf;
	ForkNumber	forknum;

	xl_hash_init_meta_page *xlrec = (xl_hash_init_meta_page *) XLogRecGetData(record);

	/* create the index' metapage */
	metabuf = XLogInitBufferForRedo(record, 0);
	Assert(BufferIsValid(metabuf));
	_hash_init_metabuffer(metabuf, xlrec->num_tuples, xlrec->procid,
						  xlrec->ffactor, true);
	page = (Page) BufferGetPage(metabuf);
	PageSetLSN(page, lsn);
	MarkBufferDirty(metabuf);

	/*
	 * Force the on-disk state of init forks to always be in sync with the
	 * state in shared buffers.  See XLogReadBufferForRedoExtended.  We need
	 * special handling for init forks as create index operations don't log a
	 * full page image of the metapage.
	 */
	XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
	if (forknum == INIT_FORKNUM)
		FlushOneBuffer(metabuf);

	/* all done */
	UnlockReleaseBuffer(metabuf);
}

/*
 * replay a hash index bitmap page
 */
static void
he3hash_xlog_init_bitmap_page(XLogReaderState *record)
{
	XLogRecPtr	lsn = record->EndRecPtr;
	Buffer		bitmapbuf;
	Buffer		metabuf;
	Page		page;
	HashMetaPage metap;
	uint32		num_buckets;
	ForkNumber	forknum;

	uint8 blocknum = XLogRecGetBlockNum(record);

	switch(blocknum) {
		case 0:
		{
			xl_hash_init_bitmap_page *xlrec = (xl_hash_init_bitmap_page *) XLogRecGetData(record);

			/*
			 * Initialize bitmap page
			 */
			bitmapbuf = XLogInitBufferForRedo(record, 0);
			_hash_initbitmapbuffer(bitmapbuf, xlrec->bmsize, true);
			PageSetLSN(BufferGetPage(bitmapbuf), lsn);
			MarkBufferDirty(bitmapbuf);

			/*
			 * Force the on-disk state of init forks to always be in sync with the
			 * state in shared buffers.  See XLogReadBufferForRedoExtended.  We need
			 * special handling for init forks as create index operations don't log a
			 * full page image of the metapage.
			 */
			XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
			if (forknum == INIT_FORKNUM)
				FlushOneBuffer(bitmapbuf);
			UnlockReleaseBuffer(bitmapbuf);
			break;
		}
		case 1:
			/* add the new bitmap page to the metapage's list of bitmaps */
			if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO)
			{
				/*
				 * Note: in normal operation, we'd update the metapage while still
				 * holding lock on the bitmap page.  But during replay it's not
				 * necessary to hold that lock, since nobody can see it yet; the
				 * creating transaction hasn't yet committed.
				 */
				page = BufferGetPage(metabuf);
				metap = HashPageGetMeta(page);

				num_buckets = metap->hashm_maxbucket + 1;
				metap->hashm_mapp[metap->hashm_nmaps] = num_buckets + 1;
				metap->hashm_nmaps++;

				PageSetLSN(page, lsn);
				MarkBufferDirty(metabuf);

				XLogRecGetBlockTag(record, 0, NULL, &forknum, NULL);
				if (forknum == INIT_FORKNUM)
					FlushOneBuffer(metabuf);
			}
			if (BufferIsValid(metabuf))
				UnlockReleaseBuffer(metabuf);
			break;
	}
}

/*
 * replay a hash index insert without split
 */
static void
he3hash_xlog_insert(XLogReaderState *record)
{
	HashMetaPage metap;
	XLogRecPtr	lsn = record->EndRecPtr;
	xl_hash_insert *xlrec = (xl_hash_insert *) XLogRecGetData(record);
	Buffer		buffer;
	Page		page;
	
	uint8 blocknum = XLogRecGetBlockNum(record);
	switch(blocknum) {
		case 0:
			if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
			{
				Size		datalen;
				char	   *datapos = XLogRecGetBlockData(record, 0, &datalen);

				page = BufferGetPage(buffer);

				if (PageAddItem(page, (Item) datapos, datalen, xlrec->offnum,
								false, false) == InvalidOffsetNumber)
					elog(PANIC, "hash_xlog_insert: failed to add item");

				PageSetLSN(page, lsn);
				MarkBufferDirty(buffer);
			}
			if (BufferIsValid(buffer))
				UnlockReleaseBuffer(buffer);
			break;
		case 1:
			if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
			{
				/*
				 * Note: in normal operation, we'd update the metapage while still
				 * holding lock on the page we inserted into.  But during replay it's
				 * not necessary to hold that lock, since no other index updates can
				 * be happening concurrently.
				 */
				page = BufferGetPage(buffer);
				metap = HashPageGetMeta(page);
				metap->hashm_ntuples += 1;

				PageSetLSN(page, lsn);
				MarkBufferDirty(buffer);
			}
			if (BufferIsValid(buffer))
				UnlockReleaseBuffer(buffer);
			break;
	}
}

/*
 * replay addition of overflow page for hash index
 */
static void
he3hash_xlog_add_ovfl_page(XLogReaderState *record)
{
	XLogRecPtr	lsn = record->EndRecPtr;
	xl_hash_add_ovfl_page *xlrec = (xl_hash_add_ovfl_page *) XLogRecGetData(record);
	Buffer		leftbuf;
	Buffer		ovflbuf;
	Buffer		metabuf;
	BlockNumber leftblk;
	BlockNumber rightblk;
	BlockNumber newmapblk = InvalidBlockNumber;
	Page		ovflpage;
	HashPageOpaque ovflopaque;
	uint32	   *num_bucket;
	char	   *data;
	Size		datalen PG_USED_FOR_ASSERTS_ONLY;
	bool		new_bmpage = false;
	
	uint8 blocknum = XLogRecGetBlockNum(record);
	switch(blocknum) {
		case 0:
			ovflbuf = XLogInitBufferForRedo(record, 0);
			Assert(BufferIsValid(ovflbuf));

			data = XLogRecGetBlockData(record, 0, &datalen);
			memcpy(&leftblk,data,sizeof(BlockNumber));
			num_bucket = (uint32 *) (data+sizeof(BlockNumber));
			Assert(datalen == sizeof(uint32)+sizeof(BlockNumber));
			_hash_initbuf(ovflbuf, InvalidBlockNumber, *num_bucket, LH_OVERFLOW_PAGE,
						  true);
			/* update backlink */
			ovflpage = BufferGetPage(ovflbuf);
			ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);
			//XLogRecGetBlockTag(record, 1, NULL, NULL, &leftblk);
			ovflopaque->hasho_prevblkno = leftblk;

			PageSetLSN(ovflpage, lsn);
			MarkBufferDirty(ovflbuf);
			UnlockReleaseBuffer(ovflbuf);
			break;
		case 1:
			if (XLogReadBufferForRedo(record, 0, &leftbuf) == BLK_NEEDS_REDO)
			{
				Page		leftpage;
				HashPageOpaque leftopaque;
		
				leftpage = BufferGetPage(leftbuf);
				leftopaque = (HashPageOpaque) PageGetSpecialPointer(leftpage);
				data = XLogRecGetBlockData(record, 0, &datalen);
				memcpy(&rightblk,data,sizeof(BlockNumber));
				//XLogRecGetBlockTag(record, 0, NULL, NULL, &rightblk);
				leftopaque->hasho_nextblkno = rightblk;
		
				PageSetLSN(leftpage, lsn);
				MarkBufferDirty(leftbuf);
			}
			if (BufferIsValid(leftbuf))
				UnlockReleaseBuffer(leftbuf);
			break;
		case 2:
			
			/*
				 * Note: in normal operation, we'd update the bitmap and meta page while
				 * still holding lock on the overflow pages.  But during replay it's not
				 * necessary to hold those locks, since no other index updates can be
				 * happening concurrently.
				 */
			if (XLogRecHasBlockRef(record, 0))
			{
				Buffer		mapbuffer;
		
				if (XLogReadBufferForRedo(record, 0, &mapbuffer) == BLK_NEEDS_REDO)
				{
					Page		mappage = (Page) BufferGetPage(mapbuffer);
					uint32	   *freep = NULL;
					char	   *data;
					uint32	   *bitmap_page_bit;
		
					freep = HashPageGetBitmap(mappage);
		
					data = XLogRecGetBlockData(record, 0, &datalen);
					bitmap_page_bit = (uint32 *) data;
		
					SETBIT(freep, *bitmap_page_bit);
		
					PageSetLSN(mappage, lsn);
					MarkBufferDirty(mapbuffer);
				}
				if (BufferIsValid(mapbuffer))
					UnlockReleaseBuffer(mapbuffer);
			}
			break;
		case 3:
			if (XLogRecHasBlockRef(record, 0))
			{
				Buffer		newmapbuf;
		
				newmapbuf = XLogInitBufferForRedo(record, 0);
		
				_hash_initbitmapbuffer(newmapbuf, xlrec->bmsize, true);
		
				new_bmpage = true;
				newmapblk = BufferGetBlockNumber(newmapbuf);
		
				MarkBufferDirty(newmapbuf);
				PageSetLSN(BufferGetPage(newmapbuf), lsn);
		
				UnlockReleaseBuffer(newmapbuf);
			}
			break;
		case 4:
			if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO)
			{
				HashMetaPage metap;
				Page		page;
				uint32	   *firstfree_ovflpage;

				data = XLogRecGetBlockData(record, 0, &datalen);
				firstfree_ovflpage = (uint32 *) data;

				page = BufferGetPage(metabuf);
				metap = HashPageGetMeta(page);
				metap->hashm_firstfree = *firstfree_ovflpage;

				if (!xlrec->bmpage_found)
				{
					metap->hashm_spares[metap->hashm_ovflpoint]++;

					if (new_bmpage)
					{
						Assert(BlockNumberIsValid(newmapblk));

						metap->hashm_mapp[metap->hashm_nmaps] = newmapblk;
						metap->hashm_nmaps++;
						metap->hashm_spares[metap->hashm_ovflpoint]++;
					}
				}

				PageSetLSN(page, lsn);
				MarkBufferDirty(metabuf);
			}
			if (BufferIsValid(metabuf))
				UnlockReleaseBuffer(metabuf);
			break;
	}
}

/*
 * replay allocation of page for split operation
 */
static void
he3hash_xlog_split_allocate_page(XLogReaderState *record)
{
	XLogRecPtr	lsn = record->EndRecPtr;
	xl_hash_split_allocate_page *xlrec = (xl_hash_split_allocate_page *) XLogRecGetData(record);
	Buffer		oldbuf = InvalidBuffer;
	Buffer		newbuf;
	Buffer		metabuf;
	Size		datalen PG_USED_FOR_ASSERTS_ONLY;
	char	   *data;
	XLogRedoAction action;
	ReadBufferMode mode = RBM_NORMAL;
	BufferTag *tag = record->tag;
	BufferTag oldtag;
	RelFileNode rnode;
	BlockNumber blkno;
	ForkNumber forknum;
	uint8 blocknum = XLogRecGetBlockNum(record);
	switch(blocknum) {
		case 0:
			
			/*
			 * To be consistent with normal operation, here we take cleanup locks on
			 * both the old and new buckets even though there can't be any concurrent
			 * inserts.
			 */
			XLogRecGetBlockTag(record, 0, &rnode, &forknum, &blkno);
			memcpy(&(oldtag.rnode),&rnode,sizeof(rnode));
			oldtag.forkNum = forknum;
			oldtag.blockNum = blkno;
			if (tag!= NULL && BUFFERTAGS_EQUAL(*tag,oldtag)) {
				mode = RBM_NORMAL_VALID;
				oldbuf = record->buffer;
			} 
			
			/* replay the record for old bucket */
			action = XLogReadBufferForRedoExtended(record, 0, mode, true, &oldbuf);
		
			/*
			 * Note that we still update the page even if it was restored from a full
			 * page image, because the special space is not included in the image.
			 */
			if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
			{
				Page		oldpage;
				HashPageOpaque oldopaque;
		
				oldpage = BufferGetPage(oldbuf);
				oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);
		
				oldopaque->hasho_flag = xlrec->old_bucket_flag;
				oldopaque->hasho_prevblkno = xlrec->new_bucket;
		
				PageSetLSN(oldpage, lsn);
				MarkBufferDirty(oldbuf);
			}
			/*
			 * We can release the lock on old bucket early as well but doing here to
			 * consistent with normal operation.
			 */
			if (mode != RBM_NORMAL_VALID && BufferIsValid(oldbuf))
				UnlockReleaseBuffer(oldbuf);
			break;
		case 1:
			
		/* replay the record for new bucket */
			newbuf = XLogInitBufferForRedo(record, 0);
			_hash_initbuf(newbuf, xlrec->new_bucket, xlrec->new_bucket,
						  xlrec->new_bucket_flag, true);
			if (!IsBufferCleanupOK(newbuf))
				elog(PANIC, "hash_xlog_split_allocate_page: failed to acquire cleanup lock");
			MarkBufferDirty(newbuf);
			PageSetLSN(BufferGetPage(newbuf), lsn);
		
			if (BufferIsValid(newbuf))
				UnlockReleaseBuffer(newbuf);
			break;
		case 2:
			/*
			 * Note: in normal operation, we'd update the meta page while still
			 * holding lock on the old and new bucket pages.  But during replay it's
			 * not necessary to hold those locks, since no other bucket splits can be
			 * happening concurrently.
			 */

			/* replay the record for metapage changes */
			if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO)
			{
				Page		page;
				HashMetaPage metap;

				page = BufferGetPage(metabuf);
				metap = HashPageGetMeta(page);
				metap->hashm_maxbucket = xlrec->new_bucket;

				data = XLogRecGetBlockData(record, 0, &datalen);

				if (xlrec->flags & XLH_SPLIT_META_UPDATE_MASKS)
				{
					uint32		lowmask;
					uint32	   *highmask;

					/* extract low and high masks. */
					memcpy(&lowmask, data, sizeof(uint32));
					highmask = (uint32 *) ((char *) data + sizeof(uint32));

					/* update metapage */
					metap->hashm_lowmask = lowmask;
					metap->hashm_highmask = *highmask;

					data += sizeof(uint32) * 2;
				}

				if (xlrec->flags & XLH_SPLIT_META_UPDATE_SPLITPOINT)
				{
					uint32		ovflpoint;
					uint32	   *ovflpages;

					/* extract information of overflow pages. */
					memcpy(&ovflpoint, data, sizeof(uint32));
					ovflpages = (uint32 *) ((char *) data + sizeof(uint32));

					/* update metapage */
					metap->hashm_spares[ovflpoint] = *ovflpages;
					metap->hashm_ovflpoint = ovflpoint;
				}

				MarkBufferDirty(metabuf);
				PageSetLSN(BufferGetPage(metabuf), lsn);
			}

			if (BufferIsValid(metabuf))
				UnlockReleaseBuffer(metabuf);
			break;
	}
}

/*
 * replay of split operation
 */
static void
hash_xlog_split_page(XLogReaderState *record)
{
	Buffer		buf;

	if (XLogReadBufferForRedo(record, 0, &buf) != BLK_RESTORED)
		elog(ERROR, "Hash split record did not contain a full-page image");

	UnlockReleaseBuffer(buf);
}

/*
 * replay completion of split operation
 */
static void
he3hash_xlog_split_complete(XLogReaderState *record)
{
	XLogRecPtr	lsn = record->EndRecPtr;
	xl_hash_split_complete *xlrec = (xl_hash_split_complete *) XLogRecGetData(record);
	Buffer		oldbuf;
	Buffer		newbuf;
	XLogRedoAction action;
	
	uint8 blocknum = XLogRecGetBlockNum(record);
	switch(blocknum) {
		case 0:
			/* replay the record for old bucket */
			action = XLogReadBufferForRedo(record, 0, &oldbuf);

			/*
			 * Note that we still update the page even if it was restored from a full
			 * page image, because the bucket flag is not included in the image.
			 */
			if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
			{
				Page		oldpage;
				HashPageOpaque oldopaque;

				oldpage = BufferGetPage(oldbuf);
				oldopaque = (HashPageOpaque) PageGetSpecialPointer(oldpage);

				oldopaque->hasho_flag = xlrec->old_bucket_flag;

				PageSetLSN(oldpage, lsn);
				MarkBufferDirty(oldbuf);
			}
			if (BufferIsValid(oldbuf))
				UnlockReleaseBuffer(oldbuf);
			break;
		case 1:
			/* replay the record for new bucket */
			action = XLogReadBufferForRedo(record, 0, &newbuf);

			/*
			 * Note that we still update the page even if it was restored from a full
			 * page image, because the bucket flag is not included in the image.
			 */
			if (action == BLK_NEEDS_REDO || action == BLK_RESTORED)
			{
				Page		newpage;
				HashPageOpaque nopaque;

				newpage = BufferGetPage(newbuf);
				nopaque = (HashPageOpaque) PageGetSpecialPointer(newpage);

				nopaque->hasho_flag = xlrec->new_bucket_flag;

				PageSetLSN(newpage, lsn);
				MarkBufferDirty(newbuf);
			}
			if (BufferIsValid(newbuf))
				UnlockReleaseBuffer(newbuf);
			break;
	}

}

/*
 * replay move of page contents for squeeze operation of hash index
 */
static void
he3hash_xlog_move_page_contents(XLogReaderState *record)
{
	XLogRecPtr	lsn = record->EndRecPtr;
	xl_hash_move_page_contents *xldata = (xl_hash_move_page_contents *) XLogRecGetData(record);
	Buffer		bucketbuf = InvalidBuffer;
	Buffer		writebuf = InvalidBuffer;
	Buffer		deletebuf = InvalidBuffer;
	XLogRedoAction action;
	RelFileNode rnode;
	BlockNumber blkno;
	ForkNumber forknum;
	XLogRecGetBlockTag(record, 0, &rnode, &forknum, &blkno);

	ReadBufferMode mode = RBM_NORMAL;
	BufferTag *tag = record->tag;
	BufferTag oldtag;
	memcpy(&(oldtag.rnode),&rnode,sizeof(rnode));
	oldtag.forkNum = forknum;
	oldtag.blockNum = blkno;
	uint8 blocknum = XLogRecGetBlockNum(record);
	if (tag!= NULL && BUFFERTAGS_EQUAL(*tag,oldtag)) {
		mode = RBM_NORMAL_VALID;
		if (blocknum == 0) {
			bucketbuf = record->buffer;
		} else if (blocknum == 1) {
			writebuf = record->buffer;
		}
	} 

	/*
	 * Ensure we have a cleanup lock on primary bucket page before we start
	 * with the actual replay operation.  This is to ensure that neither a
	 * scan can start nor a scan can be already-in-progress during the replay
	 * of this operation.  If we allow scans during this operation, then they
	 * can miss some records or show the same record multiple times.
	 */
	switch(blocknum) {
		case 0:
			if (!xldata->is_prim_bucket_same_wrt) {
				/*
				 * we don't care for return value as the purpose of reading bucketbuf
				 * is to ensure a cleanup lock on primary bucket page.
				 */
				 
				(void) XLogReadBufferForRedoExtended(record, 0, mode, true, &bucketbuf);
			}
			if (mode!= RBM_NORMAL_VALID && BufferIsValid(bucketbuf))
				UnlockReleaseBuffer(bucketbuf);
			break;
		case 1:
			if (xldata->is_prim_bucket_same_wrt)
				action = XLogReadBufferForRedoExtended(record, 0, RBM_NORMAL, true, &writebuf);
			else
			{
				action = XLogReadBufferForRedo(record, 0, &writebuf);
			}
			/* replay the record for adding entries in overflow buffer */
			if (action == BLK_NEEDS_REDO)
			{
				Page		writepage;
				char	   *begin;
				char	   *data;
				Size		datalen;
				uint16		ninserted = 0;

				data = begin = XLogRecGetBlockData(record, 0, &datalen);

				writepage = (Page) BufferGetPage(writebuf);

				if (xldata->ntups > 0)
				{
					OffsetNumber *towrite = (OffsetNumber *) data;

					data += sizeof(OffsetNumber) * xldata->ntups;

					while (data - begin < datalen)
					{
						IndexTuple	itup = (IndexTuple) data;
						Size		itemsz;
						OffsetNumber l;

						itemsz = IndexTupleSize(itup);
						itemsz = MAXALIGN(itemsz);

						data += itemsz;

						l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
						if (l == InvalidOffsetNumber)
							elog(ERROR, "hash_xlog_move_page_contents: failed to add item to hash index page, size %d bytes",
								 (int) itemsz);

						ninserted++;
					}
				}

				/*
				 * number of tuples inserted must be same as requested in REDO record.
				 */
				Assert(ninserted == xldata->ntups);

				PageSetLSN(writepage, lsn);
				MarkBufferDirty(writebuf);
			}
			if (mode!= RBM_NORMAL_VALID && BufferIsValid(writebuf))
				UnlockReleaseBuffer(writebuf);
			break;
		case 2:
			/* replay the record for deleting entries from overflow buffer */
			if (XLogReadBufferForRedo(record, 0, &deletebuf) == BLK_NEEDS_REDO)
			{
				Page		page;
				char	   *ptr;
				Size		len;

				ptr = XLogRecGetBlockData(record, 0, &len);

				page = (Page) BufferGetPage(deletebuf);

				if (len > 0)
				{
					OffsetNumber *unused;
					OffsetNumber *unend;

					unused = (OffsetNumber *) ptr;
					unend = (OffsetNumber *) ((char *) ptr + len);

					if ((unend - unused) > 0)
						PageIndexMultiDelete(page, unused, unend - unused);
				}

				PageSetLSN(page, lsn);
				MarkBufferDirty(deletebuf);
			}
			/*
			 * Replay is complete, now we can release the buffers. We release locks at
			 * end of replay operation to ensure that we hold lock on primary bucket
			 * page till end of operation.  We can optimize by releasing the lock on
			 * write buffer as soon as the operation for same is complete, if it is
			 * not same as primary bucket page, but that doesn't seem to be worth
			 * complicating the code.
			 */
			if (BufferIsValid(deletebuf))
				UnlockReleaseBuffer(deletebuf);
			break;
	}
}

/*
 * replay squeeze page operation of hash index
 */
static void
he3hash_xlog_squeeze_page(XLogReaderState *record)
{
	XLogRecPtr	lsn = record->EndRecPtr;
	xl_hash_squeeze_page *xldata = (xl_hash_squeeze_page *) XLogRecGetData(record);
	Buffer		bucketbuf = InvalidBuffer;
	Buffer		writebuf = InvalidBuffer;
	Buffer		ovflbuf;
	Buffer		prevbuf = InvalidBuffer;
	Buffer		mapbuf;
	XLogRedoAction action;
	RelFileNode rnode;
	BlockNumber blkno;
	ForkNumber forknum;
	XLogRecGetBlockTag(record, 0, &rnode, &forknum, &blkno);

	ReadBufferMode mode = RBM_NORMAL;
	BufferTag *tag = record->tag;
	BufferTag oldtag;
	memcpy(&(oldtag.rnode),&rnode,sizeof(rnode));
	oldtag.forkNum = forknum;
	oldtag.blockNum = blkno;
	uint8 blocknum = XLogRecGetBlockNum(record);
	if (tag!= NULL && BUFFERTAGS_EQUAL(*tag,oldtag)) {
		mode = RBM_NORMAL_VALID;
		if (blocknum == 0) {
			bucketbuf = record->buffer;
		} else if (blocknum == 1) {
			writebuf = record->buffer;
		}
	} 
	switch(blocknum) {
		case 0:
			if (!xldata->is_prim_bucket_same_wrt) {
				/*
				 * we don't care for return value as the purpose of reading bucketbuf
				 * is to ensure a cleanup lock on primary bucket page.
				 */
				(void) XLogReadBufferForRedoExtended(record, 0, mode, true, &bucketbuf);
			}
			if (mode != RBM_NORMAL_VALID && BufferIsValid(bucketbuf))
				UnlockReleaseBuffer(bucketbuf);
			break;
		case 1:
			/*
			 * Ensure we have a cleanup lock on primary bucket page before we start
			 * with the actual replay operation.  This is to ensure that neither a
			 * scan can start nor a scan can be already-in-progress during the replay
			 * of this operation.  If we allow scans during this operation, then they
			 * can miss some records or show the same record multiple times.
			 */
			if (xldata->is_prim_bucket_same_wrt)
				action = XLogReadBufferForRedoExtended(record, 0, mode, true, &writebuf);
			else
			{
				action = XLogReadBufferForRedo(record, 0, &writebuf);
			}
			
			/* replay the record for adding entries in overflow buffer */
			if (action == BLK_NEEDS_REDO)
			{
				Page		writepage;
				char	   *begin;
				char	   *data;
				Size		datalen;
				uint16		ninserted = 0;
			
				data = begin = XLogRecGetBlockData(record, 0, &datalen);
			
				writepage = (Page) BufferGetPage(writebuf);
			
				if (xldata->ntups > 0)
				{
					OffsetNumber *towrite = (OffsetNumber *) data;
			
					data += sizeof(OffsetNumber) * xldata->ntups;
			
					while (data - begin < datalen)
					{
						IndexTuple	itup = (IndexTuple) data;
						Size		itemsz;
						OffsetNumber l;
			
						itemsz = IndexTupleSize(itup);
						itemsz = MAXALIGN(itemsz);
			
						data += itemsz;
			
						l = PageAddItem(writepage, (Item) itup, itemsz, towrite[ninserted], false, false);
						if (l == InvalidOffsetNumber)
							elog(ERROR, "hash_xlog_squeeze_page: failed to add item to hash index page, size %d bytes",
								 (int) itemsz);
			
						ninserted++;
					}
				}
			
				/*
				 * number of tuples inserted must be same as requested in REDO record.
				 */
				Assert(ninserted == xldata->ntups);
			
				/*
				 * if the page on which are adding tuples is a page previous to freed
				 * overflow page, then update its nextblkno.
				 */
				if (xldata->is_prev_bucket_same_wrt)
				{
					HashPageOpaque writeopaque = (HashPageOpaque) PageGetSpecialPointer(writepage);
			
					writeopaque->hasho_nextblkno = xldata->nextblkno;
				}
			
				PageSetLSN(writepage, lsn);
				MarkBufferDirty(writebuf);
			}
			if (mode != RBM_NORMAL_VALID && BufferIsValid(writebuf))
				UnlockReleaseBuffer(writebuf);
			break;
		case 2:
			/* replay the record for initializing overflow buffer */
			if (XLogReadBufferForRedo(record, 0, &ovflbuf) == BLK_NEEDS_REDO)
			{
				Page		ovflpage;
				HashPageOpaque ovflopaque;

				ovflpage = BufferGetPage(ovflbuf);

				_hash_pageinit(ovflpage, BufferGetPageSize(ovflbuf));

				ovflopaque = (HashPageOpaque) PageGetSpecialPointer(ovflpage);

				ovflopaque->hasho_prevblkno = InvalidBlockNumber;
				ovflopaque->hasho_nextblkno = InvalidBlockNumber;
				ovflopaque->hasho_bucket = -1;
				ovflopaque->hasho_flag = LH_UNUSED_PAGE;
				ovflopaque->hasho_page_id = HASHO_PAGE_ID;

				PageSetLSN(ovflpage, lsn);
				MarkBufferDirty(ovflbuf);
			}
			if (BufferIsValid(ovflbuf))
				UnlockReleaseBuffer(ovflbuf);
			break;
		case 3:
			/* replay the record for page previous to the freed overflow page */
			if (!xldata->is_prev_bucket_same_wrt &&
				XLogReadBufferForRedo(record, 0, &prevbuf) == BLK_NEEDS_REDO)
			{
				Page		prevpage = BufferGetPage(prevbuf);
				HashPageOpaque prevopaque = (HashPageOpaque) PageGetSpecialPointer(prevpage);

				prevopaque->hasho_nextblkno = xldata->nextblkno;

				PageSetLSN(prevpage, lsn);
				MarkBufferDirty(prevbuf);
			}
			if (BufferIsValid(prevbuf))
				UnlockReleaseBuffer(prevbuf);
			break;
		case 4:
			/* replay the record for page next to the freed overflow page */
			if (XLogRecHasBlockRef(record, 0))
			{
				Buffer		nextbuf;

				if (XLogReadBufferForRedo(record, 0, &nextbuf) == BLK_NEEDS_REDO)
				{
					Page		nextpage = BufferGetPage(nextbuf);
					HashPageOpaque nextopaque = (HashPageOpaque) PageGetSpecialPointer(nextpage);

					nextopaque->hasho_prevblkno = xldata->prevblkno;

					PageSetLSN(nextpage, lsn);
					MarkBufferDirty(nextbuf);
				}
				if (BufferIsValid(nextbuf))
					UnlockReleaseBuffer(nextbuf);
			}		
			break;
		case 5:
			/*
			 * Note: in normal operation, we'd update the bitmap and meta page while
			 * still holding lock on the primary bucket page and overflow pages.  But
			 * during replay it's not necessary to hold those locks, since no other
			 * index updates can be happening concurrently.
			 */
			/* replay the record for bitmap page */
			if (XLogReadBufferForRedo(record, 0, &mapbuf) == BLK_NEEDS_REDO)
			{
				Page		mappage = (Page) BufferGetPage(mapbuf);
				uint32	   *freep = NULL;
				char	   *data;
				uint32	   *bitmap_page_bit;
				Size		datalen;

				freep = HashPageGetBitmap(mappage);

				data = XLogRecGetBlockData(record, 0, &datalen);
				bitmap_page_bit = (uint32 *) data;

				CLRBIT(freep, *bitmap_page_bit);

				PageSetLSN(mappage, lsn);
				MarkBufferDirty(mapbuf);
			}
			if (BufferIsValid(mapbuf))
				UnlockReleaseBuffer(mapbuf);
			break;
		case 6:
			/* replay the record for meta page */
			if (XLogRecHasBlockRef(record, 0))
			{
				Buffer		metabuf;

				if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO)
				{
					HashMetaPage metap;
					Page		page;
					char	   *data;
					uint32	   *firstfree_ovflpage;
					Size		datalen;

					data = XLogRecGetBlockData(record, 0, &datalen);
					firstfree_ovflpage = (uint32 *) data;

					page = BufferGetPage(metabuf);
					metap = HashPageGetMeta(page);
					metap->hashm_firstfree = *firstfree_ovflpage;

					PageSetLSN(page, lsn);
					MarkBufferDirty(metabuf);
				}
				if (BufferIsValid(metabuf))
					UnlockReleaseBuffer(metabuf);
			}	
			break;
	}
}

/*
 * replay delete operation of hash index
 */
static void
he3hash_xlog_delete(XLogReaderState *record)
{
	XLogRecPtr	lsn = record->EndRecPtr;
	xl_hash_delete *xldata = (xl_hash_delete *) XLogRecGetData(record);
	Buffer		bucketbuf = InvalidBuffer;
	Buffer		deletebuf = InvalidBuffer;
	Page		page;
	XLogRedoAction action;
	Buffer		buffer = InvalidBuffer;
	RelFileNode rnode;
	BlockNumber blkno;
	ForkNumber forknum;
	XLogRecGetBlockTag(record, 0, &rnode, &forknum, &blkno);

	ReadBufferMode mode = RBM_NORMAL;
	BufferTag *tag = record->tag;
	BufferTag oldtag;
	memcpy(&(oldtag.rnode),&rnode,sizeof(rnode));
	oldtag.forkNum = forknum;
	oldtag.blockNum = blkno;
	uint8 blocknum = XLogRecGetBlockNum(record);
	if (tag!= NULL && BUFFERTAGS_EQUAL(*tag,oldtag)) {
		mode = RBM_NORMAL_VALID;
		if (blocknum == 0) {
			bucketbuf = record->buffer;
		} else {
			deletebuf = record->buffer;
		}
	} 	
	switch(blocknum) {
		case 0:
			if (!xldata->is_primary_bucket_page) {
				/*
				 * we don't care for return value as the purpose of reading bucketbuf
				 * is to ensure a cleanup lock on primary bucket page.
				 */
				(void) XLogReadBufferForRedoExtended(record, 0, mode, true, &bucketbuf);
			}
			if (mode != RBM_NORMAL_VALID && BufferIsValid(bucketbuf))
				UnlockReleaseBuffer(bucketbuf);
			break;
		case 1:
			/*
			 * Ensure we have a cleanup lock on primary bucket page before we start
			 * with the actual replay operation.  This is to ensure that neither a
			 * scan can start nor a scan can be already-in-progress during the replay
			 * of this operation.  If we allow scans during this operation, then they
			 * can miss some records or show the same record multiple times.
			 */
			if (xldata->is_primary_bucket_page)
				action = XLogReadBufferForRedoExtended(record, 0, mode, true, &deletebuf);
			else
			{	
				action = XLogReadBufferForRedo(record, 0, &deletebuf);
			}
			/* replay the record for deleting entries in bucket page */
			if (action == BLK_NEEDS_REDO)
			{
				char	   *ptr;
				Size		len;

				ptr = XLogRecGetBlockData(record, 0, &len);

				page = (Page) BufferGetPage(deletebuf);

				if (len > 0)
				{
					OffsetNumber *unused;
					OffsetNumber *unend;

					unused = (OffsetNumber *) ptr;
					unend = (OffsetNumber *) ((char *) ptr + len);

					if ((unend - unused) > 0)
						PageIndexMultiDelete(page, unused, unend - unused);
				}

				/*
				 * Mark the page as not containing any LP_DEAD items only if
				 * clear_dead_marking flag is set to true. See comments in
				 * hashbucketcleanup() for details.
				 */
				if (xldata->clear_dead_marking)
				{
					HashPageOpaque pageopaque;

					pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
					pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
				}

				PageSetLSN(page, lsn);
				MarkBufferDirty(deletebuf);
			}
			if (mode != RBM_NORMAL_VALID && BufferIsValid(deletebuf))
				UnlockReleaseBuffer(deletebuf);
			break;
	}
}

/*
 * replay split cleanup flag operation for primary bucket page.
 */
static void
hash_xlog_split_cleanup(XLogReaderState *record)
{
	XLogRecPtr	lsn = record->EndRecPtr;
	Buffer		buffer;
	Page		page;

	if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
	{
		HashPageOpaque bucket_opaque;

		page = (Page) BufferGetPage(buffer);

		bucket_opaque = (HashPageOpaque) PageGetSpecialPointer(page);
		bucket_opaque->hasho_flag &= ~LH_BUCKET_NEEDS_SPLIT_CLEANUP;
		PageSetLSN(page, lsn);
		MarkBufferDirty(buffer);
	}
	if (BufferIsValid(buffer))
		UnlockReleaseBuffer(buffer);
}

/*
 * replay for update meta page
 */
static void
hash_xlog_update_meta_page(XLogReaderState *record)
{
	HashMetaPage metap;
	XLogRecPtr	lsn = record->EndRecPtr;
	xl_hash_update_meta_page *xldata = (xl_hash_update_meta_page *) XLogRecGetData(record);
	Buffer		metabuf;
	Page		page;

	if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO)
	{
		page = BufferGetPage(metabuf);
		metap = HashPageGetMeta(page);

		metap->hashm_ntuples = xldata->ntuples;

		PageSetLSN(page, lsn);
		MarkBufferDirty(metabuf);
	}
	if (BufferIsValid(metabuf))
		UnlockReleaseBuffer(metabuf);
}

/*
 * replay delete operation in hash index to remove
 * tuples marked as DEAD during index tuple insertion.
 */
static void
he3hash_xlog_vacuum_one_page(XLogReaderState *record)
{
	XLogRecPtr	lsn = record->EndRecPtr;
	xl_hash_vacuum_one_page *xldata;
	Buffer		buffer = InvalidBuffer;
	Buffer		metabuf;
	Page		page;
	XLogRedoAction action;
	HashPageOpaque pageopaque;

	xldata = (xl_hash_vacuum_one_page *) XLogRecGetData(record);
	RelFileNode rnode;
	BlockNumber blkno;
	ForkNumber forknum;
	XLogRecGetBlockTag(record, 0, &rnode, &forknum, &blkno);

	ReadBufferMode mode = RBM_NORMAL;
	BufferTag *tag = record->tag;
	BufferTag oldtag;
	memcpy(&(oldtag.rnode),&rnode,sizeof(rnode));
	oldtag.forkNum = forknum;
	oldtag.blockNum = blkno;
	if (tag!= NULL && BUFFERTAGS_EQUAL(*tag,oldtag)) {
		mode = RBM_NORMAL_VALID;
		buffer = record->buffer;
	} 
	uint8 blocknum = XLogRecGetBlockNum(record);
	switch(blocknum) {
		case 0:
			/*
			 * If we have any conflict processing to do, it must happen before we
			 * update the page.
			 *
			 * Hash index records that are marked as LP_DEAD and being removed during
			 * hash index tuple insertion can conflict with standby queries. You might
			 * think that vacuum records would conflict as well, but we've handled
			 * that already.  XLOG_HEAP2_PRUNE records provide the highest xid cleaned
			 * by the vacuum of the heap and so we can resolve any conflicts just once
			 * when that arrives.  After that we know that no conflicts exist from
			 * individual hash index vacuum records on that index.
			 */
			if (InHotStandby)
			{
				RelFileNode rnode;

				XLogRecGetBlockTag(record, 0, &rnode, NULL, NULL);
				ResolveRecoveryConflictWithSnapshot(xldata->latestRemovedXid, rnode);
			}

			action = XLogReadBufferForRedoExtended(record, 0, mode, true, &buffer);

			if (action == BLK_NEEDS_REDO)
			{
				page = (Page) BufferGetPage(buffer);

				if (XLogRecGetDataLen(record) > SizeOfHashVacuumOnePage)
				{
					OffsetNumber *unused;

					unused = (OffsetNumber *) ((char *) xldata + SizeOfHashVacuumOnePage);

					PageIndexMultiDelete(page, unused, xldata->ntuples);
				}

				/*
				 * Mark the page as not containing any LP_DEAD items. See comments in
				 * _hash_vacuum_one_page() for details.
				 */
				pageopaque = (HashPageOpaque) PageGetSpecialPointer(page);
				pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;

				PageSetLSN(page, lsn);
				MarkBufferDirty(buffer);
			}
			if (mode != RBM_NORMAL_VALID && BufferIsValid(buffer))
				UnlockReleaseBuffer(buffer);
			break;
		case 1:
			if (XLogReadBufferForRedo(record, 0, &metabuf) == BLK_NEEDS_REDO)
			{
				Page		metapage;
				HashMetaPage metap;

				metapage = BufferGetPage(metabuf);
				metap = HashPageGetMeta(metapage);

				metap->hashm_ntuples -= xldata->ntuples;

				PageSetLSN(metapage, lsn);
				MarkBufferDirty(metabuf);
			}
			if (BufferIsValid(metabuf))
				UnlockReleaseBuffer(metabuf);
			break;
	}
}

void
hash_redo(XLogReaderState *record)
{
	uint8		info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;

	switch (info)
	{
		case XLOG_HASH_INIT_META_PAGE:
			hash_xlog_init_meta_page(record);
			break;
		case XLOG_HASH_INIT_BITMAP_PAGE:
			he3hash_xlog_init_bitmap_page(record);
			break;
		case XLOG_HASH_INSERT:
			he3hash_xlog_insert(record);
			break;
		case XLOG_HASH_ADD_OVFL_PAGE:
			he3hash_xlog_add_ovfl_page(record);
			break;
		case XLOG_HASH_SPLIT_ALLOCATE_PAGE:
			he3hash_xlog_split_allocate_page(record);
			break;
		case XLOG_HASH_SPLIT_PAGE:
			hash_xlog_split_page(record);
			break;
		case XLOG_HASH_SPLIT_COMPLETE:
			he3hash_xlog_split_complete(record);
			break;
		case XLOG_HASH_MOVE_PAGE_CONTENTS:
			he3hash_xlog_move_page_contents(record);
			break;
		case XLOG_HASH_SQUEEZE_PAGE:
			he3hash_xlog_squeeze_page(record);
			break;
		case XLOG_HASH_DELETE:
			he3hash_xlog_delete(record);
			break;
		case XLOG_HASH_SPLIT_CLEANUP:
			hash_xlog_split_cleanup(record);
			break;
		case XLOG_HASH_UPDATE_META_PAGE:
			hash_xlog_update_meta_page(record);
			break;
		case XLOG_HASH_VACUUM_ONE_PAGE:
			he3hash_xlog_vacuum_one_page(record);
			break;
		default:
			elog(PANIC, "hash_redo: unknown op code %u", info);
	}
}

/*
 * Mask a hash page before performing consistency checks on it.
 */
void
hash_mask(char *pagedata, BlockNumber blkno)
{
	Page		page = (Page) pagedata;
	HashPageOpaque opaque;
	int			pagetype;

	mask_page_lsn_and_checksum(page);

	mask_page_hint_bits(page);
	mask_unused_space(page);

	opaque = (HashPageOpaque) PageGetSpecialPointer(page);

	pagetype = opaque->hasho_flag & LH_PAGE_TYPE;
	if (pagetype == LH_UNUSED_PAGE)
	{
		/*
		 * Mask everything on a UNUSED page.
		 */
		mask_page_content(page);
	}
	else if (pagetype == LH_BUCKET_PAGE ||
			 pagetype == LH_OVERFLOW_PAGE)
	{
		/*
		 * In hash bucket and overflow pages, it is possible to modify the
		 * LP_FLAGS without emitting any WAL record. Hence, mask the line
		 * pointer flags. See hashgettuple(), _hash_kill_items() for details.
		 */
		mask_lp_flags(page);
	}

	/*
	 * It is possible that the hint bit LH_PAGE_HAS_DEAD_TUPLES may remain
	 * unlogged. So, mask it. See _hash_kill_items() for details.
	 */
	opaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
}
